package com.zs.pig.blog.producer;

import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.sankuai.xm.kafka.client.IProducerProcessor;
import com.sankuai.xm.kafka.client.factory.KafkaProducerBuildFactory;
import com.sankuai.xm.kafka.client.utils.FutureCallback;

/**
 * Created by likaige on 2016-05-31.
 */
@Component
public class BusinessProducer {

    private static final Logger log = LoggerFactory.getLogger(BusinessProducer.class);

    private static AtomicLong errorTimes = new AtomicLong(0L);
   // private static Counter businessProducerError = MetricBeans.counter("api.BusinessProducer.error");
    
    private IProducerProcessor producer;

    @PostConstruct
    public void init() {
        try {
            producer = KafkaProducerBuildFactory.init();
        } catch (Exception e) {
            log.error("build kafka producer failed.");
        }
    }

    public void sendMessageAsync(final byte[] json, final long id) throws Exception {
        producer.sendAsyncMessage(json, String.valueOf(id), new FutureCallback() {
            @Override
            public void onSuccess(Object asyncProducerResult) {
                log.info("BusinessProducer send to kafka success.");
            }

            @Override
            public void onFailure(Object msg, Throwable throwable) {
                errorTimes.incrementAndGet();
    //            businessProducerError.inc();
                log.error("BusinessProducer send to kafka error, msg={} errorTimes={}", json, errorTimes.get());
            }
        });
    }

}
